/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package eventhubs

import (
	"context"
	"errors"
	"fmt"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"time"

	"github.com/Azure/azure-amqp-common-go/v3/aad"
	"github.com/Azure/azure-amqp-common-go/v3/conn"
	eventhub "github.com/Azure/azure-event-hubs-go/v3"
	"github.com/Azure/azure-event-hubs-go/v3/eph"
	"github.com/Azure/azure-event-hubs-go/v3/storage"
	"github.com/Azure/azure-storage-blob-go/azblob"
	"github.com/Azure/go-autorest/autorest/azure"

	"github.com/dapr/kit/logger"
	azauth "github.com/liuxd6825/components-contrib/authentication/azure"
	"github.com/liuxd6825/components-contrib/bindings"
)

const (
	// metadata.
	connectionString = "connectionString"

	// required by subscriber.
	consumerGroup        = "consumerGroup"
	storageAccountName   = "storageAccountName"
	storageAccountKey    = "storageAccountKey"
	storageContainerName = "storageContainerName"

	// optional.
	partitionKeyName = "partitionKey"
	partitionIDName  = "partitionID"
	hubName          = "eventHub"
	hubNamespaceName = "eventHubNamespace"

	// errors.
	hubConnectionInitErrorMsg           = "error: creating eventHub hub client"
	invalidConnectionStringErrorMsg     = "error: connectionString is invalid"
	missingHubNamespaceErrorMsg         = "error: connectionString or eventHubNamespace is required"
	missingHubNameErrorMsg              = "error: connectionString or eventHub is required"
	missingStorageAccountNameErrorMsg   = "error: storageAccountName is a required attribute"
	missingStorageAccountKeyErrorMsg    = "error: storageAccountKey is a required attribute when connectionString is provided"
	missingStorageContainerNameErrorMsg = "error: storageContainerName is a required attribute"
	missingConsumerGroupErrorMsg        = "error: consumerGroup is a required attribute"

	// Event Hubs SystemProperties names for metadata passthrough.
	sysPropSequenceNumber             = "x-opt-sequence-number"
	sysPropEnqueuedTime               = "x-opt-enqueued-time"
	sysPropOffset                     = "x-opt-offset"
	sysPropPartitionID                = "x-opt-partition-id"
	sysPropPartitionKey               = "x-opt-partition-key"
	sysPropIotHubDeviceConnectionID   = "iothub-connection-device-id"
	sysPropIotHubAuthGenerationID     = "iothub-connection-auth-generation-id"
	sysPropIotHubConnectionAuthMethod = "iothub-connection-auth-method"
	sysPropIotHubConnectionModuleID   = "iothub-connection-module-id"
	sysPropIotHubEnqueuedTime         = "iothub-enqueuedtime"
	sysPropMessageID                  = "message-id"
)

func readHandler(e *eventhub.Event, handler func(*bindings.ReadResponse) ([]byte, error)) error {
	res := bindings.ReadResponse{Data: e.Data, Metadata: map[string]string{}}
	if e.SystemProperties.SequenceNumber != nil {
		res.Metadata[sysPropSequenceNumber] = strconv.FormatInt(*e.SystemProperties.SequenceNumber, 10)
	}
	if e.SystemProperties.EnqueuedTime != nil {
		res.Metadata[sysPropEnqueuedTime] = e.SystemProperties.EnqueuedTime.Format(time.RFC3339)
	}
	if e.SystemProperties.Offset != nil {
		res.Metadata[sysPropOffset] = strconv.FormatInt(*e.SystemProperties.Offset, 10)
	}
	// According to azure-event-hubs-go docs, this will always be nil.
	if e.SystemProperties.PartitionID != nil {
		res.Metadata[sysPropPartitionID] = strconv.Itoa(int(*e.SystemProperties.PartitionID))
	}
	// The following metadata properties are only present if event was generated by Azure IoT Hub.
	if e.SystemProperties.PartitionKey != nil {
		res.Metadata[sysPropPartitionKey] = *e.SystemProperties.PartitionKey
	}
	if e.SystemProperties.IoTHubDeviceConnectionID != nil {
		res.Metadata[sysPropIotHubDeviceConnectionID] = *e.SystemProperties.IoTHubDeviceConnectionID
	}
	if e.SystemProperties.IoTHubAuthGenerationID != nil {
		res.Metadata[sysPropIotHubAuthGenerationID] = *e.SystemProperties.IoTHubAuthGenerationID
	}
	if e.SystemProperties.IoTHubConnectionAuthMethod != nil {
		res.Metadata[sysPropIotHubConnectionAuthMethod] = *e.SystemProperties.IoTHubConnectionAuthMethod
	}
	if e.SystemProperties.IoTHubConnectionModuleID != nil {
		res.Metadata[sysPropIotHubConnectionModuleID] = *e.SystemProperties.IoTHubConnectionModuleID
	}
	if e.SystemProperties.IoTHubEnqueuedTime != nil {
		res.Metadata[sysPropIotHubEnqueuedTime] = e.SystemProperties.IoTHubEnqueuedTime.Format(time.RFC3339)
	}
	// azure-event-hubs-go SDK pulls out the AMQP message-id property to the Event.ID property, map it from there.
	if e.ID != "" {
		res.Metadata[sysPropMessageID] = e.ID
	}
	_, err := handler(&res)

	return err
}

// AzureEventHubs allows sending/receiving Azure Event Hubs events.
type AzureEventHubs struct {
	hub               *eventhub.Hub
	metadata          *azureEventHubsMetadata
	eventHubSettings  azauth.EnvironmentSettings
	tokenProvider     *aad.TokenProvider
	storageCredential azblob.Credential
	azureEnvironment  *azure.Environment
	logger            logger.Logger
	userAgent         string
}

type azureEventHubsMetadata struct {
	connectionString      string
	consumerGroup         string
	storageAccountName    string
	storageAccountKey     string
	storageContainerName  string
	partitionID           string
	partitionKey          string
	eventHubName          string
	eventHubNamespaceName string
}

func (m azureEventHubsMetadata) partitioned() bool {
	return m.partitionID != ""
}

// NewAzureEventHubs returns a new Azure Event hubs instance.
func NewAzureEventHubs(logger logger.Logger) *AzureEventHubs {
	return &AzureEventHubs{logger: logger}
}

func validate(connectionString string) error {
	_, err := conn.ParsedConnectionFromStr(connectionString)
	return err
}

// Init performs metadata init.
func (a *AzureEventHubs) Init(metadata bindings.Metadata) error {
	m, err := parseMetadata(metadata)
	if err != nil {
		return err
	}

	a.userAgent = "dapr-" + logger.DaprVersion

	a.metadata = m
	if a.metadata.connectionString != "" {
		// Validate connectionString.
		validateErr := validate(a.metadata.connectionString)
		if validateErr != nil {
			return errors.New(invalidConnectionStringErrorMsg)
		}

		var hub *eventhub.Hub
		// Create partitioned sender if the partitionID is configured.
		if a.metadata.partitioned() {
			hub, err = eventhub.NewHubFromConnectionString(a.metadata.connectionString,
				eventhub.HubWithPartitionedSender(a.metadata.partitionID),
				eventhub.HubWithUserAgent(a.userAgent),
			)
		} else {
			hub, err = eventhub.NewHubFromConnectionString(a.metadata.connectionString,
				eventhub.HubWithUserAgent(a.userAgent),
			)
		}

		if err != nil {
			return fmt.Errorf("unable to connect to azure event hubs: %w", err)
		}
		a.hub = hub
	} else {
		// Connect via AAD.
		settings, sErr := azauth.NewEnvironmentSettings("eventhubs", metadata.Properties)
		if sErr != nil {
			return sErr
		}
		a.eventHubSettings = settings
		tokenProvider, tokenErr := a.eventHubSettings.GetAADTokenProvider()
		if tokenErr != nil {
			return fmt.Errorf("%s %w", hubConnectionInitErrorMsg, err)
		}
		a.tokenProvider = tokenProvider

		a.hub, err = eventhub.NewHub(a.metadata.eventHubNamespaceName, a.metadata.eventHubName, a.tokenProvider, eventhub.HubWithUserAgent(a.userAgent))
		if err != nil {
			return fmt.Errorf("unable to connect to azure event hubs: %w", err)
		}
	}

	// connect to the storage account.
	if m.storageAccountKey != "" {
		metadata.Properties["accountKey"] = m.storageAccountKey
	}
	a.storageCredential, a.azureEnvironment, err = azauth.GetAzureStorageCredentials(a.logger, m.storageAccountName, metadata.Properties)
	if err != nil {
		return fmt.Errorf("invalid credentials with error: %w", err)
	}

	return nil
}

func parseMetadata(meta bindings.Metadata) (*azureEventHubsMetadata, error) {
	m := &azureEventHubsMetadata{}

	if val, ok := meta.Properties[connectionString]; ok && val != "" {
		m.connectionString = val
	}

	if val, ok := meta.Properties[storageAccountName]; ok && val != "" {
		m.storageAccountName = val
	} else {
		return m, errors.New(missingStorageAccountNameErrorMsg)
	}

	if val, ok := meta.Properties[storageAccountKey]; ok && val != "" {
		m.storageAccountKey = val
	} else if m.connectionString != "" {
		return m, errors.New(missingStorageAccountKeyErrorMsg)
	}

	if val, ok := meta.Properties[storageContainerName]; ok && val != "" {
		m.storageContainerName = val
	} else {
		return m, errors.New(missingStorageContainerNameErrorMsg)
	}

	if val, ok := meta.Properties[consumerGroup]; ok && val != "" {
		m.consumerGroup = val
	} else {
		return m, errors.New(missingConsumerGroupErrorMsg)
	}

	if val, ok := meta.Properties[partitionKeyName]; ok {
		m.partitionKey = val
	}

	if val, ok := meta.Properties[partitionIDName]; ok {
		m.partitionID = val
	}

	if val, ok := meta.Properties[hubName]; ok {
		m.eventHubName = val
	} else if m.connectionString == "" {
		return m, errors.New(missingHubNameErrorMsg)
	}

	if val, ok := meta.Properties[hubNamespaceName]; ok {
		m.eventHubNamespaceName = val
	} else if m.connectionString == "" {
		return m, errors.New(missingHubNamespaceErrorMsg)
	}

	return m, nil
}

func (a *AzureEventHubs) Operations() []bindings.OperationKind {
	return []bindings.OperationKind{bindings.CreateOperation}
}

// Write posts an event hubs message.
func (a *AzureEventHubs) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
	event := &eventhub.Event{
		Data: req.Data,
	}

	// Send partitionKey in event.
	if a.metadata.partitionKey != "" {
		event.PartitionKey = &a.metadata.partitionKey
	} else {
		partitionKey, ok := req.Metadata[partitionKeyName]
		if partitionKey != "" && ok {
			event.PartitionKey = &partitionKey
		}
	}

	err := a.hub.Send(context.Background(), event)
	if err != nil {
		return nil, err
	}

	return nil, nil
}

// Read gets messages from eventhubs in a non-blocking fashion.
func (a *AzureEventHubs) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error {
	if !a.metadata.partitioned() {
		if err := a.RegisterEventProcessor(handler); err != nil {
			return err
		}
	} else {
		if err := a.RegisterPartitionedEventProcessor(handler); err != nil {
			return err
		}
	}

	// close Event Hubs when application exits.
	exitChan := make(chan os.Signal, 1)
	signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM)
	<-exitChan

	a.Close()

	return nil
}

// RegisterPartitionedEventProcessor - receive eventhub messages by partitionID.
func (a *AzureEventHubs) RegisterPartitionedEventProcessor(handler func(*bindings.ReadResponse) ([]byte, error)) error {
	ctx := context.Background()

	runtimeInfo, err := a.hub.GetRuntimeInformation(ctx)
	if err != nil {
		return err
	}

	callback := func(c context.Context, event *eventhub.Event) error {
		if event != nil {
			return readHandler(event, handler)
		}

		return nil
	}

	ops := []eventhub.ReceiveOption{
		eventhub.ReceiveWithLatestOffset(),
	}

	if a.metadata.consumerGroup != "" {
		a.logger.Infof("eventhubs: using consumer group %s", a.metadata.consumerGroup)
		ops = append(ops, eventhub.ReceiveWithConsumerGroup(a.metadata.consumerGroup))
	}

	if contains(runtimeInfo.PartitionIDs, a.metadata.partitionID) {
		a.logger.Infof("eventhubs: using partition id %s", a.metadata.partitionID)

		_, err := a.hub.Receive(ctx, a.metadata.partitionID, callback, ops...)
		if err != nil {
			return err
		}
	}

	return nil
}

func contains(arr []string, str string) bool {
	for _, a := range arr {
		if a == str {
			return true
		}
	}

	return false
}

// RegisterEventProcessor - receive eventhub messages by eventprocessor
// host by balancing partitions.
func (a *AzureEventHubs) RegisterEventProcessor(handler func(*bindings.ReadResponse) ([]byte, error)) error {
	leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(a.storageCredential, a.metadata.storageAccountName, a.metadata.storageContainerName, *a.azureEnvironment)
	if err != nil {
		return err
	}

	var processor *eph.EventProcessorHost
	if a.metadata.connectionString != "" {
		processor, err = eph.NewFromConnectionString(context.Background(), a.metadata.connectionString, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(a.metadata.consumerGroup))
		if err != nil {
			return err
		}
	} else {
		// AAD connection.
		processor, err = eph.New(context.Background(), a.metadata.eventHubNamespaceName, a.metadata.eventHubName, a.tokenProvider, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(a.metadata.consumerGroup))
		if err != nil {
			return err
		}
		a.logger.Debugf("processor initialized via AAD for eventHubName %s", a.metadata.eventHubName)
	}

	_, err = processor.RegisterHandler(context.Background(),
		func(c context.Context, e *eventhub.Event) error {
			return readHandler(e, handler)
		})
	if err != nil {
		return err
	}

	err = processor.StartNonBlocking(context.Background())
	if err != nil {
		return err
	}

	return nil
}

func (a *AzureEventHubs) Close() error {
	return a.hub.Close(context.Background())
}
